Skip to content

[3/5][core]refactor communication layer: PR 3 of 5, all other models in non async mode #3719

Merged
hsliuustc0106 merged 19 commits into
vllm-project:mainfrom
natureofnature:pr/refactor/pr3
May 28, 2026
Merged

[3/5][core]refactor communication layer: PR 3 of 5, all other models in non async mode #3719
hsliuustc0106 merged 19 commits into
vllm-project:mainfrom
natureofnature:pr/refactor/pr3

Conversation

@natureofnature
Copy link
Copy Markdown
Contributor

@natureofnature natureofnature commented May 19, 2026

PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.

Purpose

For the Refactor of commnication layer, there are going to be 5 PRs in total.
Refer to PR #1555 as the first PR.
PR:#2677 as the second PR.
There are going to be 3 remaining PRs .

  • 1/5: PR 1555, merged, basic infra
  • 2/5: Refactor on Qwen3 Omni non async mode
  • 3/5: Refactor of other models using non async mode (in progress)
  • 4/5: Refactor async execution workflow
  • 5/5: Remove redundant codes and add docs.

Scope

The PR3 closure migrated these 10 (arch, stage) transitions. Payload
shape
is the actual dict layout shipped over the connector (verified
against 81208065); per-builder details live in §8.

Arch Stage Group Transition Payload shape (top-level keys)
Qwen3OmniMoeForConditionalGeneration talker (PR1) thinker → talker OmniPayload-nested (hidden_states.*, embed.prefill, ids.*)
Qwen3OmniMoeForConditionalGeneration code2wav (PR1) talker → code2wav {codes.audio, meta.finished}
Qwen2_5OmniForConditionalGeneration talker B thinker → talker OmniPayload-nested: hidden_states.{output, output_shape}, embed.{prefill, prefill_shape}, ids.{prompt, output}
Qwen2_5OmniForConditionalGeneration code2wav B talker → code2wav {codes.audio (boundary-stripped tokens), meta.finished}
CovoAudioForConditionalGeneration code2wav B fused → code2wav {codes.audio (filtered tokens), meta.finished}
MiMoAudioModel code2wav B fused → code2wav {codes.audio (col-major flat tokens), meta.finished}
Qwen3TTSCode2Wav code2wav C talker → code2wav {codes.audio (ref-prefixed, flattened), meta.finished}codes.ref / ref_code_len are producer-side intermediates only, folded into codes.audio before send
CosyVoice3Model cosyvoice3_code2wav D text → code2wav {embed.{speech_token, speech_feat, embedding}, meta.finished}
DyninOmniForConditionalGeneration token2image B token2text → token2image Hybrid: normalized prompt-metadata fields flattened at top level (speaker, language, detok_id, …) + codes.audio (token_ids) + meta.finished
DyninOmniForConditionalGeneration token2audio B token2image → token2audio same shape, second hop

Group A — protect-only (no migration): archs that have a multi-stage pipeline but stay on the legacy additional_information data plane. PR3 doesn't migrate them; it only generalizes the new gate plumbing so
their absence from _OMNI_CONNECTOR_INIT_ARCHS and _FULL_PAYLOAD_INPUT_STAGES doesn't break init. Verification = topology guard tests that prevent accidental future migration.
Members: MingFlash.

Group B — straightforward migration (1-D codec or hidden-state tensor): producer emits per-step pooler output, accumulator either keeps the latest (hidden-state edges) or CONCATs along dim 0 (codec
edges), builder packages into nested OmniPayload-style dict. No producer-side splicing of multiple accumulator sources, no out-of-band metadata channels. Verification = e2e on each member individually.
Members: Qwen2_5Omni (both transitions), CovoAudio, MiMoAudio, DyninOmni (both hops).

Group C — splice + reshape: producer accumulates two independent streams (generated codec + reference codec) plus an integer metadata field, then fuses them producer-side before shipping. The fused
payload is still single-keyed (codes.audio), but the splicing logic (transpose, codebook-major flatten, ref prepend) means correctness depends on accumulator ordering, length contracts, and a per-codebook filter — all PR3-new code. Verification = e2e plus a dedicated _filter_audio_codes_qwen3_tts unit test on edge cases (negative,
all-zero, out-of-range).
Members: Qwen3TTS.

Group D — partial migration with explicit exception: arch has multiple sub-paths and only the prompt-conditioning sub-path moves to the connector. The sync codec sub-path stays on legacy as a
deliberate scoped exception。 The connector edge here carries embed.* prompt tensors only — not codec tokens. Verification = e2e on both zh and en prompts to catch the producer/consumer divergence under realistic content.
Members: CosyVoice3.

Architecture

image
sequenceDiagram
    autonumber
    participant Sched as Stage N+1 Scheduler
    participant Coord as Coordinator
    participant RunP as Stage N Runner
    participant Acc as accumulator
    participant Conn as Connector (shm)
    participant RunC as Stage N+1 Runner
    participant Model as Stage N+1 model.forward

    Note over Sched,Coord: A fresh request lands on Stage N+1.
    Sched->>Coord: process_pending_full_payload_inputs(...)
    Coord->>Coord: WAITING -> WAITING_FOR_INPUT
    Coord-->>Sched: pending_input_registrations=[OmniChunkRecvHandle(...)]
    Sched->>RunC: SchedulerOutput (with handles)
    RunC->>Conn: register_chunk_recv(req_id, external_req_id)

    Note over RunP,Acc: Stage N executes its forward steps.
    loop per token/step
        RunP->>Acc: accumulate_full_payload_output(req_id, pooler_output)
    end
    RunP->>Conn: flush_full_payload_outputs({req_id_finished})
    Conn->>Conn: SIP._full_payload(pooling_output, request) -> payload dict
    Conn-->>RunC: bg thread delivers payload to Stage N+1 connector

    Sched->>RunC: next schedule() cycle
    RunC->>RunC: recv_full_payload_inputs(scheduler_output)<br/> -> stage_recv_req_ids
    RunC-->>Sched: OmniConnectorOutput(request_metadata, recv_ids)
    Sched->>Coord: _consume_pending_connector_output(...)
    Coord->>Coord: WAITING_FOR_INPUT -> WAITING (req now schedulable)

    Note over Sched,Model: Subsequent cycle schedules the req for Stage N+1 forward.
    Sched->>RunC: SchedulerOutput (req now runnable)
    RunC->>Model: forward(model_intermediate_buffer={req_id: payload})
    Model-->>RunC: outputs
Loading

Change of lines

Source: git diff --numstat upstream/main..HEAD on pr/refactor/pr3 branch
(rebased on top of upstream/main 89f88195, 10 commits, 36 files).

Top-level buckets

Bucket +adds -dels files
Production (vllm_omni/) +1718 -224 28
Tests (tests/) +1202 -58 8
Total +2920 -282 36

Production breakdown

Sub-bucket +adds -dels files % of prod adds
stage_input_processors/*.py +1210 -43 8 70%
Other production (scheduler / worker / mixin / output / pipeline) +504 -181 17 29%
yaml configs (stage_configs/, deploy/) +4 0 3 0.2%

Stage input processors (8 files, +1210 / -43)

File +adds -dels Scope
qwen2_5_omni.py +331 0 real producer + token-only
qwen3_tts.py +203 -3 per-arch producer + negative-codec filter
cosyvoice3.py +148 -1 legacy text2flow_token_only + SIP (stage input processor) prompt-prefix strip
mimo_audio.py +132 0 per-arch producer + all-zero codec rows return empty
dynin_omni.py +125 0 _build_full_payload + nested codes.audio / meta.finished
qwen3_omni.py +100 -32 REPLACE keys + thinker_emb/hid trim refinement
ming_flash_omni.py +95 0 per-arch SIP builders + arch-not-in-allowlist note
covo_audio.py +76 -7 per-arch SIP builders

Test Plan

test-ready, test-merge, test-nightly

Test Result

Screenshot from 2026-05-26 22-39-28

The failed case Entrypoint Test with H100 (timeout) happens in many other CI (CI index 10365, 10366, 10338, 10325, 10326...)

Qwen3 TTS performance comparison (vs commit ec94e83)

RPS: Request throughput
Audio: Audio throughput

Case Phase Load N RPS Baseline RPS Current RPS Δ ↑ Audio Baseline Audio Current Audio Δ ↑ Mean E2E Baseline Mean E2E Current E2E Δ ↓ P99 E2E Baseline P99 E2E Current P99 Δ ↓ Mean RTF Baseline Mean RTF Current RTF Δ ↓
base voice_clone latency c1 20 1.3319 1.3378 +0.44% 5.9828 6.0094 +0.44% 750.5 747.2 -0.44% 1065.7 1058.9 -0.63% 0.1701 0.1689 -0.71%
base voice_clone throughput c8 80 6.1399 6.6868 +8.91% 26.2172 28.2651 +7.81% 1266.7 1169.5 -7.68% 2186.8 1854.8 -15.18% 0.3033 0.2798 -7.73%
base voice_clone throughput c16 128 10.1545 10.0588 -0.94% 43.5122 43.0515 -1.06% 1490.8 1513.4 +1.52% 2289.9 2370.5 +3.52% 0.3591 0.3661 +1.94%
base voice_clone throughput c64 128 10.5650 10.2868 -2.63% 44.8683 44.4067 -1.03% 4658.5 4856.8 +4.26% 6731.3 7076.8 +5.13% 1.1712 1.2058 +2.96%
custom default_voice latency c1 20 1.2970 1.2811 -1.23% 6.9521 6.8665 -1.23% 770.7 780.3 +1.24% 1116.4 1117.7 +0.12% 0.1440 0.1459 +1.26%
custom default_voice throughput c8 80 7.6575 7.5575 -1.31% 42.5066 42.3370 -0.40% 989.2 1006.9 +1.78% 1372.4 1529.3 +11.43% 0.1788 0.1805 +0.94%
custom default_voice throughput c16 128 9.6700 9.4300 -2.48% 54.3453 53.6568 -1.27% 1592.0 1615.0 +1.45% 2259.9 2071.9 -8.32% 0.2853 0.2868 +0.54%
custom default_voice throughput c64 128 9.6552 9.3760 -2.89% 53.8642 53.2733 -1.10% 5124.7 5305.3 +3.52% 7015.8 7431.7 +5.93% 0.9420 0.9670 +2.65%
custom default_voice stress rate=2.0 100 1.9626 1.9636 +0.05% 11.0580 10.9084 -1.35% 876.6 866.9 -1.10% 1486.3 1240.4 -16.54% 0.1557 0.1561 +0.23%
custom voice_design latency c1 20 1.4540 1.4252 -1.98% 6.8687 6.8978 +0.42% 687.5 701.4 +2.02% 1024.3 1022.5 -0.18% 0.1460 0.1452 -0.55%
custom voice_design throughput c8 80 9.0068 8.8036 -2.26% 43.0703 42.2571 -1.89% 848.1 855.9 +0.91% 1327.2 1448.4 +9.13% 0.1781 0.1790 +0.50%
custom voice_design throughput c16 128 11.5008 11.0865 -3.60% 53.4641 52.3491 -2.09% 1331.9 1354.4 +1.69% 2041.7 2104.1 +3.06% 0.2945 0.2928 -0.60%
custom voice_design throughput c64 128 10.9401 11.1982 +2.36% 51.5417 52.4357 +1.73% 4419.9 4456.8 +0.83% 6328.8 6110.5 -3.45% 0.9836 0.9991 +1.57%
custom voice_design stress rate=2.0 100 1.9700 1.9683 -0.09% 9.2780 9.0464 -2.50% 729.4 715.2 -1.95% 1238.3 1296.7 +4.72% 0.1552 0.1560 +0.53%

Notes: The higher the better, the lower the better。
Summary:On average RPS Δ ↑ = -0.55%,average Audio throughtput Δ ↑ = -0.25%,no obvious performance degration


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan. Please provide the test scripts & test commands. Please state the reasons if your codes don't require additional test scripts. For test file guidelines, please check the test style doc
  • The test results. Please paste the results comparison before and after, or the e2e results.
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model. Please run mkdocs serve to sync the documentation editions to ./docs.
  • (Optional) Release notes update. If your change is user-facing, please update the release notes draft.

BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)

Copy link
Copy Markdown
Collaborator

@hsliuustc0106 hsliuustc0106 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a [WIP] PR and part of a multi-PR refactor series (3/5), I'll hold off on a full blocker scan. Ready for full review when the WIP label is removed and test results are added to the PR description.

@natureofnature natureofnature marked this pull request as ready for review May 21, 2026 09:48
@chatgpt-codex-connector
Copy link
Copy Markdown

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Credits must be used to enable repository wide code reviews.

@natureofnature
Copy link
Copy Markdown
Contributor Author

@hsliuustc0106 PTAL

Comment thread vllm_omni/core/sched/output.py Outdated
@hsliuustc0106 hsliuustc0106 added the merge-test label to trigger buildkite merge test CI label May 21, 2026
Comment thread vllm_omni/worker/omni_connector_model_runner_mixin.py Outdated
Comment thread vllm_omni/core/sched/omni_scheduler_mixin.py Outdated
Comment thread vllm_omni/model_executor/stage_input_processors/qwen2_5_omni.py
@Gaohan123 Gaohan123 added this to the v0.22.0 milestone May 22, 2026
@hsliuustc0106 hsliuustc0106 added ready label to trigger buildkite CI nightly-test label to trigger buildkite nightly test CI labels May 23, 2026
@npuichigo
Copy link
Copy Markdown

Any updates here

@natureofnature
Copy link
Copy Markdown
Contributor Author

Any updates here

The servers I'm using have been down. Need more time than expected to do the verification.

@natureofnature natureofnature changed the title [3/5] [WIP][core]refactor communication layer: PR 3 of 5, all other models in non async mode [3/5][core]refactor communication layer: PR 3 of 5, all other models in non async mode May 26, 2026
@natureofnature
Copy link
Copy Markdown
Contributor Author

Any updates here

I think this PR is close to being completed. Looking forward to your suggestions. @npuichigo

Copy link
Copy Markdown
Collaborator

@hsliuustc0106 hsliuustc0106 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #3719 Review: Communication Layer Refactor (3/5)

Verdict: COMMENT — well-structured refactor with thorough test evidence. Flagging a handful of quality/maintenance concerns below; none are blocking.

What I validated

  • Architecture alignment: The producer-side custom_process_next_stage_input_func gate correctly replaces the old should_accumulate_qwen3_omni_full_payload_output per-arch check. The consumer-side _FULL_PAYLOAD_INPUT_STAGES whitelist aligns with the _OMNI_CONNECTOR_INIT_ARCHS allowlist in both runners. Missing a stage from the coordinator whitelist produces a documented hang scenario (good).

  • _free_request try/finally refactor (omni_ar_scheduler.py): The unconditional finally: _free_input_coordinator_request is safe because the method is a no-op when coordinator is None. This eliminates 3 separate early-return cleanup sites — genuine improvement.

  • OmniChunkRecvHandle concrete typing: Replacing list[Any] with list[OmniChunkRecvHandle] for pending_input_registrations is correct for msgspec serialization across IPC.

  • register_chunk_recv None fallback: Fixing external_req_id=None key collisions is a real bugfix.

  • CosyVoice3 token_offset_tokens: Correctly threaded from metadata through to cosyvoice3_code2wav.forward() — was hardcoded to 0 before.

  • Input timeout safety net: _process_pending_input_timeouts with configurable VLLM_OMNI_INPUT_WAIT_TIMEOUT_S is a good defense-in-depth against stuck consumers.

  • Test evidence: PR includes +1202 lines of tests across 8 files, plus a screenshot of e2e results covering all migrated models. The Group A/B/C/D classification is clear and well-documented.

Non-blocking concerns

  1. Duplicated _OMNI_CONNECTOR_INIT_ARCHS set between gpu_ar_model_runner.py and gpu_generation_model_runner.py. These must stay in sync manually. Consider extracting to a shared constant (e.g., in omni_connector_model_runner_mixin.py or a central registry) in PR 4 or 5.

  2. Local _ensure_list shadow in qwen2_5_omni.thinker2talker_full_payload (line ~295 in the new code) defines a nested _ensure_list that accesses ._x — a non-standard attribute pattern not used by any other _ensure_list in the codebase. If this is intentional (handling a specific Request subclass), a brief comment explaining the ._x access would help future readers.

  3. _resolve_full_payload_replace_keys uses runtime module import via importlib.import_module. This works but is fragile if module names change. The caching mitigates performance impact. Worth a TODO comment for a future structural approach.

  4. _FULL_PAYLOAD_REPLACE_KEYS declared (as empty frozenset()) in every stage_input_processor module — even those that never use REPLACE semantics. This is fine for consistency but could confuse readers into thinking these are actively configured. Consider a docstring or only declaring it where non-empty.

  5. flush_full_payload_outputs during cleanup (omni_connector_model_runner_mixin.py:cleanup_finished_request) catches and logs but suppresses exceptions. The comment explains why, but a stuck flush could mask connector corruption. The current approach is pragmatic; just flagging for awareness.

Summary

This is a clean, well-documented refactor that generalizes the Qwen3-Omni connector infrastructure to 8 additional model architectures. The Group A/B/C/D classification is clear, the test coverage is substantial, and the safety improvements (try/finally cleanup, input timeout, None fallback) are genuine fixes. The duplicated arch sets and module-level _ensure_list shadow are minor maintenance items for a follow-up.

# arches below). Consumer-wait stages must be registered
# separately as `(model_arch, model_stage)` tuples in
# `omni_scheduling_coordinator._FULL_PAYLOAD_INPUT_STAGES`;
# forgetting that produces a Stage-1 hang on the consumer.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This _OMNI_CONNECTOR_INIT_ARCHS set is duplicated identically in gpu_generation_model_runner.py. If a new arch is added to one but not the other, the producer/consumer init paths silently diverge. Consider extracting to a shared constant in omni_connector_model_runner_mixin.py or a central registry in a follow-up PR.

@@ -684,26 +736,40 @@ def _should_accumulate_full_payload_output(self) -> bool:
_custom_process_func, both of which are set at init time. Avoid
the per-step dynamic import inside the model decode loop.
"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getattr(self, '_omni_connector', None) is None early-return here silently skips accumulation for stages without a connector, but doesn't log. If a stage is supposed to have a connector but init failed silently, this would mask it. A logger.debug_once(...) when this branch fires might help future debugging.

@@ -729,6 +795,43 @@ def _materialize_full_payload_entry(entry):
output[k] = tensors[0] if len(tensors) == 1 else torch.cat(tensors, dim=0)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runtime importlib.import_module to resolve _FULL_PAYLOAD_REPLACE_KEYS works and the caching is correct, but this creates an invisible coupling: renaming a stage_input_processor module silently changes the REPLACE-key set to empty (correct but fragile). Consider adding a debug log on first resolution so mismatches are diagnosable.

finish-reason-aware stop-row trim: vLLM v1 appends the sampled
token to ``output_token_ids`` before ``check_stop``, so a request
that finished via ``FINISHED_STOPPED`` has one extra accumulated
hidden-state row that the talker must not consume. Max-token
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This local _ensure_list accesses ._x — a non-standard attribute pattern not used by any other _ensure_list variant in the codebase. If this handles a specific Request subclass (e.g., CachedRequestState), a brief comment explaining what ._x represents would help maintainers.

if input_coordinator is not None:
input_coordinator.free_finished_request(request_id)

# ------------------------------------------------------------------ #
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try/except ValueError around the env var parse is good defensive coding. However, a silently-ignored bad value means a typo in VLLM_OMNI_INPUT_WAIT_TIMEOUT_S falls back to 300s with no indication. Consider a logger.warning on the ValueError path.

if self._should_transfer_kv_for_request(request_id):
already_triggered = request_id in self.transfer_triggered_requests
is_active = request_id in self.active_kv_transfers

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try/finally pattern ensuring _free_input_coordinator_request always runs is a genuine improvement over the previous 3 separate early-return cleanup sites. Good fix.

additional_information: dict[str, dict | None]


@dataclass
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concrete OmniChunkRecvHandle typing here is correct for msgspec serialization. The external_req_id: str | None = None field properly matches the None-fallback fix in register_chunk_recv.

@@ -927,11 +1048,11 @@ def register_chunk_recv(self, request: Any) -> None:
if self._stage_id == 0:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ext = getattr(request, 'external_req_id', None); if ext is not None pattern correctly prevents None_<stage>_<chunk> key collisions. This is a real bugfix.

# Filter invalid frames: zero-padded (EOS) and frames containing
# out-of-range values (e.g. stop_token_id=2150 exceeds codebook_size=2048).
# Filter invalid frames: zero-padded (EOS), out-of-range values (e.g.
# stop_token_id=2150 exceeds codebook_size=2048), and negative
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (audio_codes >= 0).all(dim=1) addition to the valid mask is a real correctness fix — negative sentinels from async scheduling would previously pass the filter and corrupt the codec sequence.

@hsliuustc0106 hsliuustc0106 added the high priority high priority issue, needs to be done asap label May 28, 2026
…giene, IPC type safety, DRY.

Five reviewer items bundled (file:line per item in the original review):

   + standard-freeing block in try/finally so the input_coordinator
   entry is pruned along every return path. Mirrors the pattern
   already in omni_generation_scheduler._free_request. Four inline
   "if self.input_coordinator is not None: self._free_input_coordinator_request(...)"
   calls collapsed into a single finally clause.

   self._omni_connector_initialized = True at the end of init.
   OmniGPUModelRunner._update_states gates cleanup_finished_request on
   this explicit flag instead of probing the private
   "_request_ids_mapping" attribute name. Removes the implicit
   "is the mixin done initialising" contract.

   list[OmniInputRegistration] (new minimal dataclass in
   vllm_omni/core/sched/output.py carrying request_id + external_req_id
   only - the two fields register_chunk_recv actually consumes).
   Replaces the prior list[Any], which msgspec falls back to JSON-ish
   serialisation for under PD-disagg / multi-node executor IPC.
   Wire payload also drops by ~one Request struct per pending
   registration. Tests stay green via duck-typed attribute access.

   capture boilerplate duplicated between omni_ar_scheduler and
   omni_generation_scheduler:
   - _consume_pending_connector_output(model_mode) -- drains
     _latest_omni_connector_output at top of schedule()
   - _capture_omni_connector_output(model_runner_output, model_mode) --
     stashes omni_connector_output at tail of update_from_output()
   - _wrap_omni_scheduler_output(base, **extras) -- builds
     OmniSchedulerOutput from a base SchedulerOutput
   AR + generation schedulers each lose 3 copy-pasted blocks.

Verified on H800 dev environment with --run-level full_model -m
"full_model and H800 and omni": test_one_word_prompt_001 +
test_speaker_002 ([default] and [async_chunk]) 4 passed in 6:32.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Three squashed bugfix commits all targeting qwen3_omni
thinker2talker_full_payload's row-counting logic for the worker
connector data plane:

1. Length-aware trim (orig c805e487): switch from unconditional [:-1]
   trim to a target_rows = len(all_token_ids) computation, so max-token
   finishes (which do not append a stop-emission row) are not
   over-trimmed.  Fixes long-output regression on test_mix_to_text_audio
   from BK 9702 main build.

2. Drop output_token_ids hoist (orig 0f14863a): surgical revert of an
   unused hoist left behind by the prior trim commit; no functional
   change.

3. Finish-reason-aware trim (orig 6d95f8bd): add a stop_emission_drop
   subtraction so FINISHED_STOPPED requests still drop their extra
   accumulated hidden-state row.  Codex P1 review on the prior commit
   identified this as a regression on short stop-finished outputs
   (spurious-phoneme on test_speaker_002).  Detection: primary via
   request.status; fallback heuristic via last-token-in-stop-set
   when worker-side CachedRequestState has no .status field.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Consolidates the two infrastructure commits that prepared PR3 for per-model
sync-data-plane migration:

- 6d4b4890 [Phase 2a] Structural gate via `_is_sync_input` marker
            (drop hard-coded `model_arch == Qwen3OmniMoeForConditionalGeneration`
             in `omni_scheduling_coordinator.uses_full_payload_input_coordinator`).
            Marker is set on the consumer-side `*_token_only` builder in
            each model's SIP module; the consumer-side scheduler gate
            reads it via the resolved `custom_process_input_func` callable.
- d7bc85fa [Phase 2d] REPLACE-keys accumulator hook + arch-gate cleanup.
            Per-model `_FULL_PAYLOAD_REPLACE_KEYS: frozenset[str]` declared
            in the SIP module; the worker accumulator
            (`omni_connector_model_runner_mixin.accumulate_full_payload_output`)
            looks it up via `proc.__module__` and uses REPLACE semantics for
            those keys (default is CONCAT).  Used by models where a key
            carries the full result so far rather than per-step deltas.

Net effect:
- `core/sched/omni_scheduling_coordinator.py`: marker-based structural gate
- `worker/omni_connector_model_runner_mixin.py`:
    `accumulate_full_payload_output`, `_resolve_full_payload_replace_keys`,
    `should_accumulate_full_payload_output`, related arch-gate cleanup
- `model_executor/stage_input_processors/qwen3_omni.py`: declares
    `_FULL_PAYLOAD_REPLACE_KEYS` for qwen3_omni's `talker2code2wav` keys
- Per-stage `custom_process_input_func` and `sync_process_input_func`
  selection plumbing remains in `config/stage_config.py:_select_processor_funcs`.

After this commit, per-arch SIP modules can declare:
  - a `*_token_only` builder (sync_process_input_func, marked `_is_sync_input = True`)
  - a `*_full_payload` builder (custom_process_next_stage_input_func)
  - optional `_FULL_PAYLOAD_REPLACE_KEYS` for REPLACE semantics
and the worker connector + scheduler coordinator handle the rest uniformly.

Per-arch migrations follow in 9 subsequent commits (covo_audio, dynin_omni × 2,
mimo_audio, qwen3_tts, cosyvoice3, ming_flash_omni, qwen2_5_omni × 2).

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Squash of 9 commits that wire per-arch stage_input_processor (SIP)
builders for the worker-connector data plane, plus the coordinator
gate generalization that makes them reachable:

- [Pilot] covo_audio (Group B llm->code2wav): pilot impl exercising
  the new structural gate.
- dynin_omni Group B (both transitions): SIP builders + yaml wires
  for stage_configs.
- qwen2_5_omni Group B half (talker->code2wav).
- mimo_audio Group B (llm->code2wav).
- qwen3_tts Group C (talker->code2wav).
- cosyvoice3 Group D-ish (text->flow).
- ming_flash_omni Group D (thinker->talker).
- qwen2_5_omni Group A reduced to D-minimal (thinker->talker
  structural sync builder).
- Coordinator gate generalization (drops hard-coded Qwen3-only check
  in uses_full_payload_input_coordinator and replaces it with the
  _FULL_PAYLOAD_INPUT_STAGES (arch, stage) whitelist).  Tests in
  tests/core/sched/test_omni_scheduling_coordinator.py +
  tests/worker/test_omni_gpu_model_runner.py adjust to the new
  whitelist contract.

Each per-arch SIP commit adds a builder pair (`*_token_only` and
`*_full_payload`) and a pipeline.py wire; tests in
test_qwen3_omni_streaming_helpers.py cover the structural
expectations.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Squash of 5 [PR3 Block A] Enable commits that wire each migrated arch
into both the worker init allowlist (_BLOCK_A_INIT_ALLOWLIST in
gpu_ar_model_runner.py + gpu_generation_model_runner.py) and the
scheduler coordinator allowlist (_FULL_PAYLOAD_INPUT_STAGES in
omni_scheduling_coordinator.py), keeping the two in lockstep so a
gate-enabled stage always has a wired-up worker connector.

Archs activated (in commit order):

- qwen2_5_omni talker -> code2wav (Block A pilot for q25;
  thinker -> talker stays orchestrator-routed at this commit because
  the producer builder is still a no-op).  Also widens
  init_omni_connectors arch allowlist from Qwen3-only to a 7-arch
  frozenset.
- covo_audio fused_thinker_talker -> code2wav.
- mimo_audio fused_thinker_talker -> code2wav.
- qwen3_tts talker -> code2wav (Qwen3TTSTalkerForConditionalGeneration
  -> Qwen3TTSCode2Wav).
- cosyvoice3 cosyvoice3_talker -> cosyvoice3_code2wav.

After this commit each arch's Stage-1 receives the full-payload
delivery via the worker connector instead of via the orchestrator-
side additional_information path.  The producer builders themselves
were added in the previous "Per-arch SIP builders" commit.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
…ation + Codex review

Consolidates the PR3 closure work: prefix_caching state-leak fix,
qwen2_5_omni real producer plumbing, dynin connector data-plane
migration, and Codex review feedback.

`omni_connector_model_runner_mixin.py`: `flush_full_payload_outputs`
is invoked at the start of `cleanup_finished_request` to drain any
pending full_payload entry for the finishing request before the rest
of the cleanup runs.  An earlier unconditional pop of
`_pending_full_payload_send` raced with flush and broke audio
consumers; flush-then-cleanup is idempotent and safe for paths
without a downstream consumer.  Fixes
`test_thinker_prefix_caching[omni_server0]` state-leak regression.

`stage_input_processors/qwen2_5_omni.py`: `thinker2talker_full_payload`
is no longer a no-op.  The real producer-side packer reads
`pooling_output["hidden"]`, splits prefill/decode, applies a
stop-emission trim aligned with the legacy contract (mirrors
qwen3_omni; covers max-token finishes without losing a hidden row),
and builds an `OmniPayloadStruct` matching the field set that
`thinker2talker_token_only` writes into `additional_information`.

`deploy/dynin_omni_ci.yaml`: adds `custom_process_next_stage_input_func`
on Stage 0 (token2text) and Stage 1 (token2image) pointing at the
`_full_payload` builders.  Without this entry, `_load_custom_func`
finds no builder, `_should_accumulate_full_payload_output()` returns
False, and the producer never enqueues a connector message.

End-to-end connector flow observed at runtime:
  Stage-0 flush_full_payload_outputs(req_id) -> to_send=[req_id]
  Stage-0 send_full_payload_outputs payload_keys=[...
      'code_predictor_codes', ...]
  Stage-1 full_payload recv complete: payload_type=dict
  Stage-1 flush_full_payload_outputs ... -> to_send=[req_id]
  (Stage-2 recv likewise)

dynin's legacy consumer-side `custom_process_input_func` on Stage 1
and Stage 2 is retained (see DECISIONS.md D-017): the connector path
is primary, but `_bridge_tokens` is still wired to propagate
`additional_information` to the downstream `OmniTokensPrompt` -- a
propagation the scheduler-side rewrite (`metadata` ->
`request.prompt_token_ids`) does not yet do.  Offline t2s passes
through the pure connector pipeline; the online server test relies
on `request.additional_information` for response assembly, so the
legacy SIP stays until the scheduler is extended in a follow-up
(see PR4 direction in D-017).

`core/sched/omni_scheduling_coordinator.py` adds three (arch, stage)
pairs to `_FULL_PAYLOAD_INPUT_STAGES`:
- `(Qwen2_5OmniForConditionalGeneration, talker)` -- consumer gate
  for the newly-active qwen2_5_omni connector path.
- `(DyninOmniForConditionalGeneration, token2image)` and
  `(..., token2audio)` -- consumer gates that park the dynin Stage 1
  and Stage 2 requests in WAITING_FOR_INPUT until the upstream
  payload arrives.

`worker/gpu_ar_model_runner.py` and
`worker/gpu_generation_model_runner.py` add
`DyninOmniForConditionalGeneration` to `_BLOCK_A_INIT_ALLOWLIST` so
`init_omni_connectors` runs for dynin workers.

Codex review fixes:

- Issue 1 (q25 SIP trim heuristic): replace unconditional
  `output_token_ids[:-1] + h[:-1]` trim with a `stop_emission_drop`-
  based trim that mirrors qwen3_omni's contract.  Folded into the
  qwen2_5_omni SIP rewrite above.
- Issue 3 (external_req_id None fallback): `register_chunk_recv` and
  `_resolve_external_req_id` now treat an explicit `None` on the
  request struct as a fallback to the internal `request_id`,
  preventing recv-key collisions like `None_<stage>_<chunk>` across
  requests.
- Issue 4 (coordinator test): rewrite the positive case in
  `tests/core/sched/test_omni_scheduling_coordinator.py` to iterate
  `_FULL_PAYLOAD_INPUT_STAGES` so newly-whitelisted (arch, stage)
  pairs fire the gate by construction.  Remove the q2.5/talker entry
  from the negative cases (now whitelisted).

`stage_configs/dynin_omni.yaml` and
`stage_configs/dynin_omni_multiconnector.yaml`: remove a transiently-
added `sync_process_input_func` line.  The active deploy path is the
`deploy/dynin_omni_ci.yaml` rewrite above; the stage_configs siblings
are kept in sync to avoid drift.

Verified on H800 dev environment:

- dynin e2e (offline + online, full): 11 passed / 1 skipped / 0 fail
  in 19:12.  Connector flow verified by stage-0/1/2 flush+send+recv
  log triplet.
- qwen3_omni online_serving (3 tests, dynin diff applied):
  3 passed / 0 failed.
- Canonical CI sweep (test-ready.yml + test-merge.yml, 21 steps):
  21 pass / 0 fail in 77.7 min.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Layered fix for cosyvoice3 sync voice cloning: producer emits real
codec via the worker connector only when the talker reaches a stop
token; consumer overlays it into the placeholder input ids and trims
the prompt-conditioning mel only when that connector path is active.
Max-token fallbacks keep the legacy `additional_information` path
intact so non-cosyvoice3 archs and other terminal conditions are
unaffected.

Producer (model side, `models/cosyvoice3/cosyvoice3.py`)
-------------------------------------------------------
- New `build_pooler_payload(req_id, req_index, input_batch,
  sampled_token_ids, invalid_req_indices)` hook discovered via
  duck-typed `getattr` on the runner.  Returns
  `{"codes.audio": tensor}` only when the per-request codec history
  is non-empty AND the talker has emitted at least one
  stop-class token (id >= speech_token_size).  Mid-step or
  pre-finish polling returns `None` so the connector message is
  shipped exactly once at finish.
- `_pooler_codec_rows` keeps cumulative codec ids in
  `self._pooler_codec_history_by_req` and tracks two per-req
  sentinels:
    - `_pooler_codec_sampled_seen_by_req`: any in-vocab token
      observed for the request.
    - `_pooler_codec_sampled_finished_by_req`: a stop-class id was
      observed.
  Sampled path is preferred; on cold-start it falls back to
  `_pooler_output_history_from_input_batch` (vllm leaves the
  decoded slots at -1 under `prefer_model_sampler=True`, so this
  fallback is normally inert here but kept for resume paths).

SIP (`stage_input_processors/cosyvoice3.py`)
--------------------------------------------
- `text2flow_full_payload` reads `pooling_output["codes.audio"]`
  (flat dotted + nested fallback), tensor-wraps into
  `codes.audio`, and sets `meta.next_stage_prompt_len = len(token_ids)`
  for the overlay length contract.
- `_FULL_PAYLOAD_REPLACE_KEYS` adds `codes.audio` (per-step
  payload carries cumulative codec, not delta).
- `text2flow_token_only` keeps the legacy `additional_information`
  packing (multimodal_output + `ids.prompt`) so the orchestrator
  still has a usable fallback when `codes.audio` is not shipped.

Runner dispatch (`worker/gpu_ar_model_runner.py`)
-------------------------------------------------
- `_attach_model_pooler_payload` invokes the model hook and merges
  returned keys via `_pooler_payload_has_key` (handles both flat
  dotted and nested layouts).  Non-cosyvoice3 archs fall through
  unchanged.
- `_output_token_ids_for_model_sampler` now trims at the first -1
  per request so the model sampler never sees placeholder slots.

Consumer overlay (`worker/gpu_generation_model_runner.py`)
----------------------------------------------------------
- `_overlay_full_payload_input_ids` runs before each generation
  step (non-async-chunk path).  For each scheduled request, looks
  up the connector payload via `model_intermediate_buffer`, reads
  `_payload_audio_codes`, flattens to a 1-D tensor, and copies it
  into the placeholder slots in `input_ids`.  Length mismatch is a
  loud RuntimeError (drift catches misaligned producer + scheduler
  next_stage_prompt_len contract).

code2wav trim (`models/cosyvoice3/cosyvoice3_code2wav.py`)
---------------------------------------------------------
- Sync `forward()` now accepts `token_offset_tokens: int = 0` and
  threads it through to `_forward_mel`; the model-side caller in
  `cosyvoice3.py` passes `speech_token.shape[1]` only when
  `payload.codes is not None and payload.codes.audio is not None`,
  i.e., the request actually traveled the connector path.  Legacy
  fallback continues to call `forward()` with the default (0) so
  pre-existing en_001 behavior is preserved.

Tests
-----
- `tests/model_executor/models/cosyvoice3/test_cosyvoice3_model_helpers.py`
  extends with hook contract cases (sampled-vs-history priority,
  finish gating, cache reuse).
- `tests/model_executor/models/cosyvoice3/test_cosyvoice3_components.py`
  covers the new sentinel sets.
- `tests/worker/test_omni_gpu_model_runner.py` adds dispatch +
  dotted-key resolution coverage for the new hook.

Verified on H800 dev environment with `--run-level full_model -m
"full_model and tts"`:

  * voice_clone_zh_001 (sync, connector path active):
      payload_keys=["codes", "embed", "meta"] code_len=292
      similarity=0.903 PASS
  * voice_clone_en_001 (sync, legacy fallback when talker hits
    max-tokens without stop):
      payload_keys=["embed", "meta"] code_len=None
      similarity=0.963 PASS

Follow-up (not gating): `_pooler_codec_history_by_req`,
`_pooler_codec_sampled_seen_by_req`, and
`_pooler_codec_sampled_finished_by_req` are not yet pruned in
`cleanup_finished_request`; long-running multi-request servers may
accumulate per-req entries.  CI is unaffected (per-test server).

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Address issue on 809f6e1 (v5):
- High #1: build_pooler_payload received `out_idx` but sampler_output/invalid_req_indices index by input_batch row.
- High vllm-project#2: gpu_generation_model_runner._overlay_full_payload_input_ids was CosyVoice3-specific in the common runner.
- Medium vllm-project#3: cosyvoice3._pooler_output_history_from_input_batch didn't stop at -1 placeholder.

Resolution: drop the connector codec path for CosyVoice3 sync and deliver codec via legacy `additional_information`, strip prompt/reference prefix at the SIP layer, and gate code2wav mel-trim on the talker-prefill offset only when a speech-stop token was seen.

Source changes:
- gpu_ar_model_runner.py: remove build_pooler_payload hook + _attach_model_pooler_payload + _pooler_payload_has_key.
- gpu_generation_model_runner.py: remove _flatten_audio_codes_to_tensor + _overlay_full_payload_input_ids + its call site.
- cosyvoice3.py (model): remove build_pooler_payload + _pooler_codec_rows + _pooler_output_history_from_input_batch + _pooler_sampled_token_ids (and three per-req caches: _pooler_codec_history_by_req, _pooler_codec_sampled_seen_by_req, _pooler_codec_sampled_finished_by_req). code2wav.forward token_offset_tokens now reads `meta.talker_prefill_offset` (already a struct field used by qwen3_tts).
- SIP cosyvoice3.py: text2flow + text2flow_token_only strip the prompt token prefix and the prompt speech_token prefix from cumulative_token_ids; set meta.talker_prefill_offset only when raw output contains a speech-stop token. text2flow_full_payload no longer ships codes.audio (embed/meta only). Drop `codes.audio` from _FULL_PAYLOAD_REPLACE_KEYS.
- _to_token_id_list no longer filters negative ids (needed for stop-token detection on raw cumulative ids).

Side effects:
- v5's cosyvoice3 per-req cache leak is gone (no pooler hook → no accumulator).
- The pre-existing baseline `voice_clone_zh_001[cosyvoice3]` sim=0.00 (transcript "先") failure is fixed.

Verification on H800 GPU  with `--run-level full_model -m "full_model and tts"`:
- test_voice_clone_zh_001[cosyvoice3]: PASS sim=1.000 (baseline FAIL sim=0.00; v5 PASS sim=0.903)
- test_voice_clone_en_001[cosyvoice3]: PASS sim=0.963 (baseline PASS sim=0.946; v5 PASS sim=0.963)

Trade-off vs project_pr3_scope: CosyVoice3 sync codec stays on legacy additional_information; embed/prompt conditioning still ships via connector. Other PR3-migrated archs are unaffected (none consumed codes.audio via the removed overlay).

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
…ix docstring accuracy

Comment-and-naming cleanup across PR3-touched files.

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Signed-off-by: natureofnature <wzliu@connect.hku.hk>
@hsliuustc0106 hsliuustc0106 merged commit 3a347b1 into vllm-project:main May 28, 2026
7 of 9 checks passed
zengchuang-hw pushed a commit to zengchuang-hw/vllm-omni that referenced this pull request Jun 1, 2026
…in non async mode (vllm-project#3719)

Signed-off-by: natureofnature <wzliu@connect.hku.hk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

high priority high priority issue, needs to be done asap merge-test label to trigger buildkite merge test CI nightly-test label to trigger buildkite nightly test CI ready label to trigger buildkite CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants